package fun.easycode.datastream;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import fun.easycode.datastream.api.TaskPageQry;
import fun.easycode.datastream.repository.Task;
import fun.easycode.datastream.repository.TaskState;
import fun.easycode.datastream.util.MyBatisPlusUtil;
import fun.easycode.jointblock.core.CheckException;
import fun.easycode.jointblock.core.DynamicOperate;
import fun.easycode.jointblock.core.PageDto;
import fun.easycode.jointblock.core.QueryDslParser;
import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils;
import fun.easycode.datastream.repository.TaskRepository;

/**
 * 全量数据处理管理器
 *
 * @author xuzhen97
 */
@Slf4j
public class DataCompleteManager {

    private final TaskRepository taskRepository;
    private final DataCompleteStartProducer dataCompleteStartProducer;


    /**
     * 构造函数
     *
     * @param taskRepository           任务仓库
     * @param dataCompleteStartProducer 任务启动生产者
     */
    public DataCompleteManager(TaskRepository taskRepository, DataCompleteStartProducer dataCompleteStartProducer) {
        this.taskRepository = taskRepository;
        this.dataCompleteStartProducer = dataCompleteStartProducer;
    }

    /**
     * 创建任务
     *
     * @param processor 数据处理器
     * @param queryDsl  查询条件
     * @param <T>       数据类型
     * @return 任务
     */
    @Transactional(rollbackFor = Exception.class)
    public <T> Task createTask(IDataProcessor<T> processor, String queryDsl) {

        Task task = new Task();
        task.setBaseEntity();

        task.setState(TaskState.NOT_START);

        task.setProcessorName(processor.getMark());

        if (!StringUtils.isEmpty(queryDsl)) {
            QueryWrapper<?> queryWrapper = QueryDslParser.parser(queryDsl).build();
            String customSql = MyBatisPlusUtil.getCustomSqlSegment(queryWrapper);
            log.info("customSql: {}", customSql);
            task.setParam(customSql);
        }
        taskRepository.save(task);
        return task;
    }

    /**
     * 启动任务
     *
     * @param taskId 任务id
     */
    @Transactional(rollbackFor = Exception.class)
    public void startTask(String taskId) {
        Task task = taskRepository.getById(taskId);

        if (task == null) {
            throw new CheckException("任务不存在");
        }

        if (taskRepository.waitTask(task.getId())) {
            dataCompleteStartProducer.produce(task.getId());
            log.info("任务进入等待消息队列成功, taskId: {}", taskId);
        }
    }

    /**
     * 分页查询任务
     * @param qry 查询条件
     * @return 任务分页列表
     */
    public PageDto<Task> taskPageQry(TaskPageQry qry){
        return  DynamicOperate.page(qry, taskRepository.getBaseMapper(), (task) -> task);
    }
}
